package com.mainbo.core.activemq;

import com.mainbo.core.common.constant.Const;
import com.mainbo.core.common.exception.JmsSenderException;
import com.mainbo.core.common.exception.RsaSignException;
import com.mainbo.core.activemq.model.PfMessage;
import com.mainbo.core.activemq.model.PfP2pMessage;
import com.mainbo.core.activemq.parser.MessageMarshallers;
import com.mainbo.core.activemq.parser.MessageParseException;
import com.mainbo.core.util.Encodes;
import com.mainbo.core.util.Identities;
import com.mainbo.core.util.RSAUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.transaction.annotation.Transactional;

import javax.jms.*;
import java.io.Serializable;
import java.util.Map;

/**
 * 消息发送器，activemq 实现
 * @author moshang
 * @date 2020-03-01
 **/

@Transactional
public class JmsSenderImpl implements JmsSender, JmsP2pSender {

    private static final Logger logger = LoggerFactory
            .getLogger(JmsSenderImpl.class);

    private static final String defaultFormatType = Const.TP_JSON;

    private JmsTemplate jmsTemplate;

    private Destination destination;

    /**
     * 加密密匙
     */
    private String secretKey;

    /**
     * 发送Message消息
     *
     * @param type
     *          自定义参数类型
     * @param to
     *          接收者标识
     * @param content
     *          消息内容
     */
    @Override
    public void sendP2pMessage(String type, String to, Serializable content) {
        PfP2pMessage<Serializable> pfP2pMessage = new PfP2pMessage<Serializable>(
                type, content, to);
        sendP2pMessage(pfP2pMessage, null);
    }

    /**
     * 发送Message消息
     *
     * @param type
     *          自定义参数类型
     * @param to
     *          接收者标识
     * @param content
     *          消息内容
     * @param params
     *          消息参数
     */
    @Override
    public void sendP2pMessage(String type, String to, Serializable content,
                               Map<String, Object> params) {
        PfP2pMessage<Serializable> pfP2pMessage = new PfP2pMessage<Serializable>(
                type, content, to);
        sendP2pMessage(pfP2pMessage, params);
    }

    @Override
    public void sendP2pMessage(PfP2pMessage<Serializable> message) {
        sendP2pMessage(message, null);
    }

    @Override
    public void sendP2pMessage(final PfP2pMessage<Serializable> message,
                               final Map<String, Object> properties) {
        asserNotNullMessage(message);
        jmsTemplate.send(destination, new MessageCreator() {
            @SuppressWarnings({ "unchecked" })
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage msg = session.createTextMessage();
                String formatType = message.getFormatType() == null ? defaultFormatType
                        : message.getFormatType();
                if (secretKey != null && StringUtils.isNotBlank(secretKey)) { // 不为空
                    msg.setStringProperty(Const.MESSAGE_SIGN,
                            Encodes.encodeBase64(RSAUtils.sign(secretKey,
                                    Const.DEFAUT_SIGN_CONTENT.getBytes())));
                    msg.setStringProperty(Const.MESSAGE_SIGN_CONTENT,
                            Const.DEFAUT_SIGN_CONTENT);
                }
                msg.setStringProperty(Const.MESSAGE_FORMATE_NAME, formatType);
                msg.setStringProperty(Const.MESSAGE_TYPE_NAME,
                        message.getClass().getName());
                msg.setStringProperty(Const.RECEIVER_ID, message.getTo());
                addProperties(properties, msg);
                try {
                    msg.setText(MessageMarshallers
                            .getMarshaller(message.getClass(), formatType).marshall(message));
                    logger.trace("send p2p Message,from ={}, to = {}, content = {}",
                            message.getFrom(), message.getTo(), msg.getText());
                } catch (MessageParseException e) {
                    logger.error("marshaller failed, msg : {}", message);
                    throw new JmsSenderException(e);
                }

                if (message.needReply()) {
                    if (message.getCorrelationID() == null) {
                        message.setCorrelationID(Identities.uuid2());
                    }
                    msg.setJMSReplyTo(destination);
                }

                if (message.getCorrelationID() != null) {
                    msg.setJMSCorrelationID(message.getCorrelationID());
                }

                return msg;
            }
        });

    }

    @Override
    public void sendMessage(String type, Serializable content,
                            Map<String, Object> params) {
        PfMessage<Serializable> message = new PfMessage<Serializable>(type,
                content);
        sendMessage(message, params);
    }

    @Override
    public void sendMessage(String type, Serializable content) {
        PfMessage<Serializable> message = new PfMessage<Serializable>(type,
                content);
        sendMessage(message, null);

    }

    @Override
    public void sendMessage(PfMessage<Serializable> message) {
        sendMessage(message, null);

    }

    @Override
    public void sendMessage(final PfMessage<Serializable> message,
                            final Map<String, Object> properties) {
        asserNotNullMessage(message);
        jmsTemplate.send(new MessageCreator() {
            @SuppressWarnings({ "unchecked" })
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage msg = session.createTextMessage();
                String formatType = message.getFormatType() == null ? defaultFormatType
                        : message.getFormatType();
                msg.setStringProperty(Const.MESSAGE_FORMATE_NAME, formatType);
                msg.setStringProperty(Const.MESSAGE_TYPE_NAME,
                        message.getClass().getName());
                try {
                    if (secretKey != null && StringUtils.isNotBlank(secretKey)) { // 不为空
                        msg.setStringProperty(Const.MESSAGE_SIGN,
                                Encodes.encodeBase64(RSAUtils.sign(secretKey,
                                        Const.DEFAUT_SIGN_CONTENT.getBytes())));
                        msg.setStringProperty(Const.MESSAGE_SIGN_CONTENT,
                                Const.DEFAUT_SIGN_CONTENT);
                    }
                    msg.setText(MessageMarshallers
                            .getMarshaller(message.getClass(), formatType).marshall(message));

                    if (message.getExcludeReceiverIds() != null) {
                        msg.setStringProperty(Const.EXCLUDE_RECEIVER_IDS,
                                message.getExcludeReceiverIds());
                    }

                    logger.trace(
                            "send topic Message, from ={}, needReply = {}, content = {}",
                            message.getFrom(), message.needReply(), msg.getText());
                } catch (MessageParseException e) {
                    logger.error("marshaller failed, msg : {}", message);
                    throw new JmsSenderException(e);
                } catch (RsaSignException e) {
                    logger.error("Rsa sign failed", e);
                    throw new JmsSenderException(e);
                }

                addProperties(properties, msg);

                if (message.needReply()) {
                    message.setCorrelationID(Identities.uuid2());
                    msg.setJMSReplyTo(destination);
                }

                if (message.getCorrelationID() != null) {
                    msg.setJMSCorrelationID(message.getCorrelationID());
                }
                return msg;
            }
        });

    }

    protected void asserNotNullMessage(PfMessage<Serializable> message) {
        if (message == null) {
            throw new JmsSenderException("message can't be null");
        }
    }

    protected void addProperties(Map<String, Object> properties,
                                 TextMessage message) {
        try {
            if (properties != null) {
                for (String key : properties.keySet()) {
                    message.setObjectProperty(key, properties.get(key));
                }
            }
        } catch (JMSException e) {
            logger.error("jms addProperties error!msg:{}", e);
        }
    }

    /**
     * Getter method for property <tt>destination</tt>.
     *
     * @return property value of destination
     */
    public Destination getDestination() {
        return destination;
    }

    /**
     * Setter method for property <tt>destination</tt>.
     *
     * @param destination
     *          value to be assigned to property destination
     */
    public void setDestination(Destination destination) {
        this.destination = destination;
    }

    /**
     * Setter method for property <tt>jmsTemplate</tt>.
     *
     * @param jmsTemplate
     *          value to be assigned to property jmsTemplate
     */
    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    /**
     * Setter method for property <tt>secretKey</tt>.
     *
     * @param secretKey
     *          value to be assigned to property secretKey
     */
    public void setSecretKey(String secretKey) {
        this.secretKey = secretKey;
    }

}
